/*-------------------------------------------------------------------------
 *
 * vacuum.c
 *      The postgres vacuum cleaner.
 *
 * This file now includes only control and dispatch code for VACUUM and
 * ANALYZE commands.  Regular VACUUM is implemented in vacuumlazy.c,
 * ANALYZE in analyze.c, and VACUUM FULL is a variant of CLUSTER, handled
 * in cluster.c.
 *
 *
 * Portions Copyright (c) 2012-2014, TransLattice, Inc.
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 * Portions Copyright (c) 2010-2012 Postgres-XC Development Group
 *
 * This source code file contains modifications made by THL A29 Limited ("Tencent Modifications").
 * All Tencent Modifications are Copyright (C) 2023 THL A29 Limited.
 *
 * IDENTIFICATION
 *      src/backend/commands/vacuum.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <math.h>

#include "access/clog.h"
#include "access/commit_ts.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/multixact.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_database.h"
#include "catalog/pg_inherits_fn.h"
#include "catalog/pg_namespace.h"
#include "commands/cluster.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/acl.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"

#ifdef _SHARDING_
#include "pgxc/shardmap.h"
#endif

#ifdef PGXC
#include "pgxc/pgxc.h"
#endif
#ifdef XCP
#include "access/visibilitymap.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "pgxc/execRemote.h"
#include "pgxc/planner.h"
#include "utils/lsyscache.h"
#endif /* XCP */

#ifdef _PG_ORCL_
#include "catalog/catalog.h"
#endif

#ifdef __OPENTENBASE__
#include "utils/ruleutils.h"
#endif

/*
 * GUC parameters
 */
int            vacuum_freeze_min_age;
int            vacuum_freeze_table_age;
int            vacuum_multixact_freeze_min_age;
int            vacuum_multixact_freeze_table_age;
int            vacuum_defer_freeze_min_age;

/* A few variables that don't seem worth passing around as parameters */
static MemoryContext vac_context = NULL;
static BufferAccessStrategy vac_strategy;


/* non-export function prototypes */
static List *get_rel_oids(Oid relid, const RangeVar *vacrel);
static void vac_truncate_clog(TransactionId frozenXID,
                  MultiXactId minMulti,
                  TransactionId lastSaneFrozenXid,
                  MultiXactId lastSaneMinMulti);
static bool vacuum_rel(Oid relid, RangeVar *relation, int options,
					   VacuumParams *params, StatSyncOpt *syncOpt);

/*
 * Primary entry point for manual VACUUM and ANALYZE commands
 *
 * This is mainly a preparation wrapper for the real operations that will
 * happen in vacuum().
 */
void
ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
{
    VacuumParams params;

    /* sanity checks on options */
    Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
    Assert((vacstmt->options & VACOPT_VACUUM) ||
           !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
    Assert((vacstmt->options & VACOPT_ANALYZE) || vacstmt->va_cols == NIL);
    Assert(!(vacstmt->options & VACOPT_SKIPTOAST));

    /*
     * All freeze ages are zero if the FREEZE option is given; otherwise pass
     * them as -1 which means to use the default values.
     */
    if (vacstmt->options & VACOPT_FREEZE)
    {
        params.freeze_min_age = vacuum_defer_freeze_min_age;
        params.freeze_table_age = vacuum_defer_freeze_min_age;
        params.multixact_freeze_min_age = vacuum_defer_freeze_min_age;
        params.multixact_freeze_table_age = vacuum_defer_freeze_min_age;
    }
    else
    {
        params.freeze_min_age = -1;
        params.freeze_table_age = -1;
        params.multixact_freeze_min_age = -1;
        params.multixact_freeze_table_age = -1;
    }

    /* user-invoked vacuum is never "for wraparound" */
    params.is_wraparound = false;

    /* user-invoked vacuum never uses this parameter */
    params.log_min_duration = -1;

    /* Now go through the common routine */
    vacuum(vacstmt->options, vacstmt->relation, InvalidOid, &params,
		   vacstmt->va_cols, NULL, isTopLevel, vacstmt->sync_option);
}

/*
 * Primary entry point for VACUUM and ANALYZE commands.
 *
 * options is a bitmask of VacuumOption flags, indicating what to do.
 *
 * relid, if not InvalidOid, indicate the relation to process; otherwise,
 * the RangeVar is used.  (The latter must always be passed, because it's
 * used for error messages.)
 *
 * params contains a set of parameters that can be used to customize the
 * behavior.
 *
 * va_cols is a list of columns to analyze, or NIL to process them all.
 *
 * bstrategy is normally given as NULL, but in autovacuum it can be passed
 * in to use the same buffer strategy object across multiple vacuum() calls.
 *
 * isTopLevel should be passed down from ProcessUtility.
 *
 * It is the caller's responsibility that all parameters are allocated in a
 * memory context that will not disappear at transaction commit.
 */
void
vacuum(int					options,
	   RangeVar			*relation,
	   Oid					relid,
	   VacuumParams		*params,
	   List				*va_cols,
	   BufferAccessStrategy bstrategy,
	   bool					isTopLevel,
	   StatSyncOpt	  *syncOpt)
{
    const char *stmttype;
    volatile bool in_outer_xact,
                use_own_xacts;
    List       *relations;
    static bool in_vacuum = false;

    Assert(params != NULL);

    stmttype = (options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";

    /*
     * We cannot run VACUUM inside a user transaction block; if we were inside
     * a transaction, then our commit- and start-transaction-command calls
     * would not have the intended effect!    There are numerous other subtle
     * dependencies on this, too.
     *
     * ANALYZE (without VACUUM) can run either way.
     */
    if (options & VACOPT_VACUUM)
    {
        PreventTransactionChain(isTopLevel, stmttype);
        in_outer_xact = false;
    }
    else
        in_outer_xact = IsInTransactionChain(isTopLevel);

    /*
     * Due to static variables vac_context, anl_context and vac_strategy,
     * vacuum() is not reentrant.  This matters when VACUUM FULL or ANALYZE
     * calls a hostile index expression that itself calls ANALYZE.
     */
    if (in_vacuum)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("%s cannot be executed from VACUUM or ANALYZE",
                        stmttype)));

    /*
     * Sanity check DISABLE_PAGE_SKIPPING option.
     */
    if ((options & VACOPT_FULL) != 0 &&
        (options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));

    /*
     * Send info about dead objects to the statistics collector, unless we are
     * in autovacuum --- autovacuum.c does this for itself.
     */
    if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
        pgstat_vacuum_stat();

    /*
     * Create special memory context for cross-transaction storage.
     *
     * Since it is a child of PortalContext, it will go away eventually even
     * if we suffer an error; there's no need for special abort cleanup logic.
     */
    vac_context = AllocSetContextCreate(PortalContext,
                                        "Vacuum",
                                        ALLOCSET_DEFAULT_SIZES);

    /*
     * If caller didn't give us a buffer strategy object, make one in the
     * cross-transaction memory context.
     */
    if (bstrategy == NULL)
    {
        MemoryContext old_context = MemoryContextSwitchTo(vac_context);

        bstrategy = GetAccessStrategy(BAS_VACUUM);
        MemoryContextSwitchTo(old_context);
    }
    vac_strategy = bstrategy;

    /*
     * Build list of relations to process, unless caller gave us one. (If we
     * build one, we put it in vac_context for safekeeping.)
     */
    relations = get_rel_oids(relid, relation);

    /*
     * Decide whether we need to start/commit our own transactions.
     *
     * For VACUUM (with or without ANALYZE): always do so, so that we can
     * release locks as soon as possible.  (We could possibly use the outer
     * transaction for a one-table VACUUM, but handling TOAST tables would be
     * problematic.)
     *
     * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
     * start/commit our own transactions.  Also, there's no need to do so if
     * only processing one relation.  For multiple relations when not within a
     * transaction block, and also in an autovacuum worker, use own
     * transactions so we can release locks sooner.
     */
    if (options & VACOPT_VACUUM)
        use_own_xacts = true;
    else
    {
        Assert(options & VACOPT_ANALYZE);
        if (IsAutoVacuumWorkerProcess())
            use_own_xacts = true;
        else if (in_outer_xact)
            use_own_xacts = false;
        else if (list_length(relations) > 1)
            use_own_xacts = true;
        else
            use_own_xacts = false;
    }

    /*
     * vacuum_rel expects to be entered with no transaction active; it will
     * start and commit its own transaction.  But we are called by an SQL
     * command, and so we are executing inside a transaction already. We
     * commit the transaction started in PostgresMain() here, and start
     * another one before exiting to match the commit waiting for us back in
     * PostgresMain().
     */
    if (use_own_xacts)
    {
        Assert(!in_outer_xact);

        /* ActiveSnapshot is not set by autovacuum */
        if (ActiveSnapshotSet())
            PopActiveSnapshot();

        /* matches the StartTransaction in PostgresMain() */
        CommitTransactionCommand();
    }

    /* Turn vacuum cost accounting on or off */
    PG_TRY();
    {
        ListCell   *cur;

        in_vacuum = true;
        VacuumCostActive = (VacuumCostDelay > 0);
        VacuumCostBalance = 0;
        VacuumPageHit = 0;
        VacuumPageMiss = 0;
        VacuumPageDirty = 0;

        /*
         * Loop to process each selected relation.
         */
        foreach(cur, relations)
        {
            Oid            relid = lfirst_oid(cur);

            if (options & VACOPT_VACUUM)
            {
				if (!vacuum_rel(relid, relation, options, params, syncOpt))
                    continue;
            }

            if (options & VACOPT_ANALYZE)
            {
                /*
                 * If using separate xacts, start one for analyze. Otherwise,
                 * we can use the outer transaction.
                 */
                if (use_own_xacts)
                {
                    StartTransactionCommand();
                    /* functions in indexes may want a snapshot set */
                    PushActiveSnapshot(GetLocalTransactionSnapshot());
                }

                analyze_rel(relid, relation, options, params,
							va_cols, in_outer_xact, vac_strategy, syncOpt);

                if (use_own_xacts)
                {
                    PopActiveSnapshot();
                    CommitTransactionCommand();
                }
            }
        }
    }
    PG_CATCH();
    {
        in_vacuum = false;
        VacuumCostActive = false;
        PG_RE_THROW();
    }
    PG_END_TRY();

    in_vacuum = false;
    VacuumCostActive = false;

    /*
     * Finish up processing.
     */
    if (use_own_xacts)
    {
        /* here, we are not in a transaction */

        /*
         * This matches the CommitTransaction waiting for us in
         * PostgresMain().
         */
        StartTransactionCommand();
    }

    if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
    {
        /*
         * Update pg_database.datfrozenxid, and truncate pg_xact if possible.
         * (autovacuum.c does this for itself.)
         */
        vac_update_datfrozenxid();
    }

    /*
     * Clean up working storage --- note we must do this after
     * StartTransactionCommand, else we might be trying to delete the active
     * context!
     */
    MemoryContextDelete(vac_context);
    vac_context = NULL;
}

/*
 * Build a list of Oids for each relation to be processed
 *
 * The list is built in vac_context so that it will survive across our
 * per-relation transactions.
 */
static List *
get_rel_oids(Oid relid, const RangeVar *vacrel)
{// #lizard forgives
    List       *oid_list = NIL;
    MemoryContext oldcontext;

    /* OID supplied by VACUUM's caller? */
    if (OidIsValid(relid))
    {
        oldcontext = MemoryContextSwitchTo(vac_context);
        oid_list = lappend_oid(oid_list, relid);
        MemoryContextSwitchTo(oldcontext);
    }
    else if (vacrel)
    {
        /* Process a specific relation */
        Oid            relid;
        HeapTuple    tuple;
        Form_pg_class classForm;
        bool        include_parts;

        /*
         * Since we don't take a lock here, the relation might be gone, or the
         * RangeVar might no longer refer to the OID we look up here.  In the
         * former case, VACUUM will do nothing; in the latter case, it will
         * process the OID we looked up here, rather than the new one. Neither
         * is ideal, but there's little practical alternative, since we're
         * going to commit this transaction and begin a new one between now
         * and then.
         */
        relid = RangeVarGetRelid(vacrel, NoLock, false);

        /*
         * To check whether the relation is a partitioned table, fetch its
         * syscache entry.
         */
        tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tuple))
            elog(ERROR, "cache lookup failed for relation %u", relid);
        classForm = (Form_pg_class) GETSTRUCT(tuple);
        include_parts = (classForm->relkind == RELKIND_PARTITIONED_TABLE);
        ReleaseSysCache(tuple);

        /*
         * Make relation list entries for this guy and its partitions, if any.
         * Note that the list returned by find_all_inheritors() include the
         * passed-in OID at its head.  Also note that we did not request a
         * lock to be taken to match what would be done otherwise.
         */
        oldcontext = MemoryContextSwitchTo(vac_context);
        if (include_parts)
            oid_list = list_concat(oid_list,
                                   find_all_inheritors(relid, NoLock, NULL));
        else
            oid_list = lappend_oid(oid_list, relid);
        MemoryContextSwitchTo(oldcontext);
    }
    else
    {
        /*
         * Process all plain relations and materialized views listed in
         * pg_class
         */
        Relation    pgclass;
        HeapScanDesc scan;
        HeapTuple    tuple;

        pgclass = heap_open(RelationRelationId, AccessShareLock);

        scan = heap_beginscan_catalog(pgclass, 0, NULL);

        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
            Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);

            /*
             * We include partitioned tables here; depending on which
             * operation is to be performed, caller will decide whether to
             * process or ignore them.
             */
            if (classForm->relkind != RELKIND_RELATION &&
                classForm->relkind != RELKIND_MATVIEW &&
                classForm->relkind != RELKIND_PARTITIONED_TABLE)
                continue;
			
			if (classForm->relpartkind == RELPARTKIND_CHILD)
				continue;

			if (classForm->relpartkind == RELPARTKIND_CHILD)
				continue;

            /* Make a relation list entry for this guy */
            oldcontext = MemoryContextSwitchTo(vac_context);
            oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
            MemoryContextSwitchTo(oldcontext);
        }

        heap_endscan(scan);
        heap_close(pgclass, AccessShareLock);
    }

    return oid_list;
}

/*
 * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
 *
 * The output parameters are:
 * - oldestXmin is the cutoff value used to distinguish whether tuples are
 *     DEAD or RECENTLY_DEAD (see HeapTupleSatisfiesVacuum).
 * - freezeLimit is the Xid below which all Xids are replaced by
 *     FrozenTransactionId during vacuum.
 * - xidFullScanLimit (computed from table_freeze_age parameter)
 *     represents a minimum Xid value; a table whose relfrozenxid is older than
 *     this will have a full-table vacuum applied to it, to freeze tuples across
 *     the whole table.  Vacuuming a table younger than this value can use a
 *     partial scan.
 * - multiXactCutoff is the value below which all MultiXactIds are removed from
 *     Xmax.
 * - mxactFullScanLimit is a value against which a table's relminmxid value is
 *     compared to produce a full-table vacuum, as with xidFullScanLimit.
 *
 * xidFullScanLimit and mxactFullScanLimit can be passed as NULL if caller is
 * not interested.
 */
void
vacuum_set_xid_limits(Relation rel,
                      int freeze_min_age,
                      int freeze_table_age,
                      int multixact_freeze_min_age,
                      int multixact_freeze_table_age,
                      TransactionId *oldestXmin,
                      TransactionId *freezeLimit,
                      TransactionId *xidFullScanLimit,
                      MultiXactId *multiXactCutoff,
                      MultiXactId *mxactFullScanLimit)
{// #lizard forgives
    int            freezemin;
    int            mxid_freezemin;
    int            effective_multixact_freeze_max_age;
    TransactionId limit;
    TransactionId safeLimit;
    MultiXactId mxactLimit;
    MultiXactId safeMxactLimit;

    /*
     * We can always ignore processes running lazy vacuum.  This is because we
     * use these values only for deciding which tuples we must keep in the
     * tables.  Since lazy vacuum doesn't write its XID anywhere, it's safe to
     * ignore it.  In theory it could be problematic to ignore lazy vacuums in
     * a full vacuum, but keep in mind that only one vacuum process can be
     * working on a particular table at any time, and that each vacuum is
     * always an independent transaction.
     */
    *oldestXmin =
        TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, PROCARRAY_FLAGS_VACUUM), rel);

    Assert(TransactionIdIsNormal(*oldestXmin));

    /*
     * Determine the minimum freeze age to use: as specified by the caller, or
     * vacuum_freeze_min_age, but in any case not more than half
     * autovacuum_freeze_max_age, so that autovacuums to prevent XID
     * wraparound won't occur too frequently.
     */
    freezemin = freeze_min_age;
    if (freezemin < 0)
        freezemin = vacuum_freeze_min_age;
    freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
    Assert(freezemin >= 0);

    /*
     * Compute the cutoff XID, being careful not to generate a "permanent" XID
     */
    limit = *oldestXmin - freezemin;
    if (!TransactionIdIsNormal(limit))
        limit = FirstNormalTransactionId;

    /*
     * If oldestXmin is very far back (in practice, more than
     * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
     * freeze age of zero.
     */
    safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
    if (!TransactionIdIsNormal(safeLimit))
        safeLimit = FirstNormalTransactionId;

    if (TransactionIdPrecedes(limit, safeLimit))
    {
        ereport(WARNING,
                (errmsg("oldest xmin is far in the past"),
                 errhint("Close open transactions soon to avoid wraparound problems.")));
        limit = *oldestXmin;
    }

    *freezeLimit = limit;

    /*
     * Compute the multixact age for which freezing is urgent.  This is
     * normally autovacuum_multixact_freeze_max_age, but may be less if we are
     * short of multixact member space.
     */
    effective_multixact_freeze_max_age = MultiXactMemberFreezeThreshold();

    /*
     * Determine the minimum multixact freeze age to use: as specified by
     * caller, or vacuum_multixact_freeze_min_age, but in any case not more
     * than half effective_multixact_freeze_max_age, so that autovacuums to
     * prevent MultiXact wraparound won't occur too frequently.
     */
    mxid_freezemin = multixact_freeze_min_age;
    if (mxid_freezemin < 0)
        mxid_freezemin = vacuum_multixact_freeze_min_age;
    mxid_freezemin = Min(mxid_freezemin,
                         effective_multixact_freeze_max_age / 2);
    Assert(mxid_freezemin >= 0);

    /* compute the cutoff multi, being careful to generate a valid value */
    mxactLimit = GetOldestMultiXactId() - mxid_freezemin;
    if (mxactLimit < FirstMultiXactId)
        mxactLimit = FirstMultiXactId;

    safeMxactLimit =
        ReadNextMultiXactId() - effective_multixact_freeze_max_age;
    if (safeMxactLimit < FirstMultiXactId)
        safeMxactLimit = FirstMultiXactId;

    if (MultiXactIdPrecedes(mxactLimit, safeMxactLimit))
    {
        ereport(WARNING,
                (errmsg("oldest multixact is far in the past"),
                 errhint("Close open transactions with multixacts soon to avoid wraparound problems.")));
        mxactLimit = safeMxactLimit;
    }

    *multiXactCutoff = mxactLimit;

    if (xidFullScanLimit != NULL)
    {
        int            freezetable;

        Assert(mxactFullScanLimit != NULL);

        /*
         * Determine the table freeze age to use: as specified by the caller,
         * or vacuum_freeze_table_age, but in any case not more than
         * autovacuum_freeze_max_age * 0.95, so that if you have e.g nightly
         * VACUUM schedule, the nightly VACUUM gets a chance to freeze tuples
         * before anti-wraparound autovacuum is launched.
         */
        freezetable = freeze_table_age;
        if (freezetable < 0)
            freezetable = vacuum_freeze_table_age;
        freezetable = Min(freezetable, autovacuum_freeze_max_age * 0.95);
        Assert(freezetable >= 0);

        /*
         * Compute XID limit causing a full-table vacuum, being careful not to
         * generate a "permanent" XID.
         */
        limit = ReadNewTransactionId() - freezetable;
        if (!TransactionIdIsNormal(limit))
            limit = FirstNormalTransactionId;

        *xidFullScanLimit = limit;

        /*
         * Similar to the above, determine the table freeze age to use for
         * multixacts: as specified by the caller, or
         * vacuum_multixact_freeze_table_age, but in any case not more than
         * autovacuum_multixact_freeze_table_age * 0.95, so that if you have
         * e.g. nightly VACUUM schedule, the nightly VACUUM gets a chance to
         * freeze multixacts before anti-wraparound autovacuum is launched.
         */
        freezetable = multixact_freeze_table_age;
        if (freezetable < 0)
            freezetable = vacuum_multixact_freeze_table_age;
        freezetable = Min(freezetable,
                          effective_multixact_freeze_max_age * 0.95);
        Assert(freezetable >= 0);

        /*
         * Compute MultiXact limit causing a full-table vacuum, being careful
         * to generate a valid MultiXact value.
         */
        mxactLimit = ReadNextMultiXactId() - freezetable;
        if (mxactLimit < FirstMultiXactId)
            mxactLimit = FirstMultiXactId;

        *mxactFullScanLimit = mxactLimit;
    }
    else
    {
        Assert(mxactFullScanLimit == NULL);
    }
}

/*
 * vac_estimate_reltuples() -- estimate the new value for pg_class.reltuples
 *
 *        If we scanned the whole relation then we should just use the count of
 *        live tuples seen; but if we did not, we should not trust the count
 *        unreservedly, especially not in VACUUM, which may have scanned a quite
 *        nonrandom subset of the table.  When we have only partial information,
 *        we take the old value of pg_class.reltuples as a measurement of the
 *        tuple density in the unscanned pages.
 *
 *		Note: scanned_tuples should count only *live* tuples, since
 *		pg_class.reltuples is defined that way.
 */
double
vac_estimate_reltuples(Relation relation, bool is_analyze,
                       BlockNumber total_pages,
                       BlockNumber scanned_pages,
                       double scanned_tuples)
{
    BlockNumber old_rel_pages = relation->rd_rel->relpages;
    double        old_rel_tuples = relation->rd_rel->reltuples;
    double        old_density;
    double        new_density;
    double        multiplier;
    double        updated_density;

    /* If we did scan the whole table, just use the count as-is */
    if (scanned_pages >= total_pages)
        return scanned_tuples;

    /*
     * If scanned_pages is zero but total_pages isn't, keep the existing value
     * of reltuples.  (Note: callers should avoid updating the pg_class
     * statistics in this situation, since no new information has been
     * provided.)
     */
    if (scanned_pages == 0)
        return old_rel_tuples;

    /*
     * If old value of relpages is zero, old density is indeterminate; we
     * can't do much except scale up scanned_tuples to match total_pages.
     */
    if (old_rel_pages == 0)
        return floor((scanned_tuples / scanned_pages) * total_pages + 0.5);

    /*
     * Okay, we've covered the corner cases.  The normal calculation is to
     * convert the old measurement to a density (tuples per page), then update
     * the density using an exponential-moving-average approach, and finally
     * compute reltuples as updated_density * total_pages.
     *
     * For ANALYZE, the moving average multiplier is just the fraction of the
     * table's pages we scanned.  This is equivalent to assuming that the
     * tuple density in the unscanned pages didn't change.  Of course, it
     * probably did, if the new density measurement is different. But over
     * repeated cycles, the value of reltuples will converge towards the
     * correct value, if repeated measurements show the same new density.
     *
     * For VACUUM, the situation is a bit different: we have looked at a
     * nonrandom sample of pages, but we know for certain that the pages we
     * didn't look at are precisely the ones that haven't changed lately.
     * Thus, there is a reasonable argument for doing exactly the same thing
     * as for the ANALYZE case, that is use the old density measurement as the
     * value for the unscanned pages.
     *
     * This logic could probably use further refinement.
     */
    old_density = old_rel_tuples / old_rel_pages;
    new_density = scanned_tuples / scanned_pages;
    multiplier = (double) scanned_pages / (double) total_pages;
    updated_density = old_density + (new_density - old_density) * multiplier;
    return floor(updated_density * total_pages + 0.5);
}


/*
 *    vac_update_relstats() -- update statistics for one relation
 *
 *        Update the whole-relation statistics that are kept in its pg_class
 *        row.  There are additional stats that will be updated if we are
 *        doing ANALYZE, but we always update these stats.  This routine works
 *        for both index and heap relation entries in pg_class.
 *
 *        We violate transaction semantics here by overwriting the rel's
 *        existing pg_class tuple with the new values.  This is reasonably
 *        safe as long as we're sure that the new values are correct whether or
 *        not this transaction commits.  The reason for doing this is that if
 *        we updated these tuples in the usual way, vacuuming pg_class itself
 *        wouldn't work very well --- by the time we got done with a vacuum
 *        cycle, most of the tuples in pg_class would've been obsoleted.  Of
 *        course, this only works for fixed-size not-null columns, but these are.
 *
 *        Another reason for doing it this way is that when we are in a lazy
 *        VACUUM and have PROC_IN_VACUUM set, we mustn't do any regular updates.
 *        Somebody vacuuming pg_class might think they could delete a tuple
 *        marked with xmin = our xid.
 *
 *        In addition to fundamentally nontransactional statistics such as
 *        relpages and relallvisible, we try to maintain certain lazily-updated
 *        DDL flags such as relhasindex, by clearing them if no longer correct.
 *        It's safe to do this in VACUUM, which can't run in parallel with
 *        CREATE INDEX/RULE/TRIGGER and can't be part of a transaction block.
 *        However, it's *not* safe to do it in an ANALYZE that's within an
 *        outer transaction, because for example the current transaction might
 *        have dropped the last index; then we'd think relhasindex should be
 *        cleared, but if the transaction later rolls back this would be wrong.
 *        So we refrain from updating the DDL flags if we're inside an outer
 *        transaction.  This is OK since postponing the flag maintenance is
 *        always allowable.
 *
 *		Note: num_tuples should count only *live* tuples, since
 *		pg_class.reltuples is defined that way.
 *
 *        This routine is shared by VACUUM and ANALYZE.
 */
void
vac_update_relstats(Relation relation,
                    BlockNumber num_pages,
                    double num_tuples,
                    BlockNumber num_all_visible_pages,
                    bool hasindex, TransactionId frozenxid,
                    MultiXactId minmulti,
                    bool in_outer_xact)
{// #lizard forgives
    Oid            relid = RelationGetRelid(relation);
    Relation    rd;
    HeapTuple    ctup;
    Form_pg_class pgcform;
    bool        dirty;

    rd = heap_open(RelationRelationId, RowExclusiveLock);

    /* Fetch a copy of the tuple to scribble on */
    ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
    if (!HeapTupleIsValid(ctup))
        elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
             relid);
    pgcform = (Form_pg_class) GETSTRUCT(ctup);

    /* Apply statistical updates, if any, to copied tuple */

    dirty = false;
    if (pgcform->relpages != (int32) num_pages)
    {
        pgcform->relpages = (int32) num_pages;
        dirty = true;
    }
    if (pgcform->reltuples != (float4) num_tuples)
    {
        pgcform->reltuples = (float4) num_tuples;
        dirty = true;
    }
    if (pgcform->relallvisible != (int32) num_all_visible_pages)
    {
        pgcform->relallvisible = (int32) num_all_visible_pages;
        dirty = true;
    }

    /* Apply DDL updates, but not inside an outer transaction (see above) */

    if (!in_outer_xact)
    {
        /*
         * If we didn't find any indexes, reset relhasindex.
         */
        if (pgcform->relhasindex && !hasindex)
        {
            pgcform->relhasindex = false;
            dirty = true;
        }

        /*
         * If we have discovered that there are no indexes, then there's no
         * primary key either.  This could be done more thoroughly...
         */
        if (pgcform->relhaspkey && !hasindex)
        {
            pgcform->relhaspkey = false;
            dirty = true;
        }

        /* We also clear relhasrules and relhastriggers if needed */
        if (pgcform->relhasrules && relation->rd_rules == NULL)
        {
            pgcform->relhasrules = false;
            dirty = true;
        }
        if (pgcform->relhastriggers && relation->trigdesc == NULL)
        {
            pgcform->relhastriggers = false;
            dirty = true;
        }
    }

    /*
     * Update relfrozenxid, unless caller passed InvalidTransactionId
     * indicating it has no new data.
     *
     * Ordinarily, we don't let relfrozenxid go backwards: if things are
     * working correctly, the only way the new frozenxid could be older would
     * be if a previous VACUUM was done with a tighter freeze_min_age, in
     * which case we don't want to forget the work it already did.  However,
     * if the stored relfrozenxid is "in the future", then it must be corrupt
     * and it seems best to overwrite it with the cutoff we used this time.
     * This should match vac_update_datfrozenxid() concerning what we consider
     * to be "in the future".
     */
    if (TransactionIdIsNormal(frozenxid) &&
        pgcform->relfrozenxid != frozenxid &&
        (TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid) ||
         TransactionIdPrecedes(ReadNewTransactionId(),
                               pgcform->relfrozenxid)))
    {
        pgcform->relfrozenxid = frozenxid;
        dirty = true;
    }

    /* Similarly for relminmxid */
    if (MultiXactIdIsValid(minmulti) &&
        pgcform->relminmxid != minmulti &&
        (MultiXactIdPrecedes(pgcform->relminmxid, minmulti) ||
         MultiXactIdPrecedes(ReadNextMultiXactId(), pgcform->relminmxid)))
    {
        pgcform->relminmxid = minmulti;
        dirty = true;
    }

    /* If anything changed, write out the tuple. */
    if (dirty)
        heap_inplace_update(rd, ctup);

    heap_close(rd, RowExclusiveLock);
}


/*
 *    vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
 *
 *        Update pg_database's datfrozenxid entry for our database to be the
 *        minimum of the pg_class.relfrozenxid values.
 *
 *        Similarly, update our datminmxid to be the minimum of the
 *        pg_class.relminmxid values.
 *
 *        If we are able to advance either pg_database value, also try to
 *        truncate pg_xact and pg_multixact.
 *
 *        We violate transaction semantics here by overwriting the database's
 *        existing pg_database tuple with the new values.  This is reasonably
 *        safe since the new values are correct whether or not this transaction
 *        commits.  As with vac_update_relstats, this avoids leaving dead tuples
 *        behind after a VACUUM.
 */
void
vac_update_datfrozenxid(void)
{// #lizard forgives
    HeapTuple    tuple;
    Form_pg_database dbform;
    Relation    relation;
    SysScanDesc scan;
    HeapTuple    classTup;
    TransactionId newFrozenXid;
    MultiXactId newMinMulti;
    TransactionId lastSaneFrozenXid;
    MultiXactId lastSaneMinMulti;
    bool        bogus = false;
    bool        dirty = false;

    /*
     * Initialize the "min" calculation with GetOldestXmin, which is a
     * reasonable approximation to the minimum relfrozenxid for not-yet-
     * committed pg_class entries for new tables; see AddNewRelationTuple().
     * So we cannot produce a wrong minimum by starting with this.
     */
    newFrozenXid = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);

    /*
     * Similarly, initialize the MultiXact "min" with the value that would be
     * used on pg_class for new tables.  See AddNewRelationTuple().
     */
    newMinMulti = GetOldestMultiXactId();

    /*
     * Identify the latest relfrozenxid and relminmxid values that we could
     * validly see during the scan.  These are conservative values, but it's
     * not really worth trying to be more exact.
     */
    lastSaneFrozenXid = ReadNewTransactionId();
    lastSaneMinMulti = ReadNextMultiXactId();

    /*
     * We must seqscan pg_class to find the minimum Xid, because there is no
     * index that can help us here.
     */
    relation = heap_open(RelationRelationId, AccessShareLock);

    scan = systable_beginscan(relation, InvalidOid, false,
                              NULL, 0, NULL);

    while ((classTup = systable_getnext(scan)) != NULL)
    {
        Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);

        /*
         * Only consider relations able to hold unfrozen XIDs (anything else
         * should have InvalidTransactionId in relfrozenxid anyway.)
         */
        if (classForm->relkind != RELKIND_RELATION &&
            classForm->relkind != RELKIND_MATVIEW &&
            classForm->relkind != RELKIND_TOASTVALUE)
            continue;

        Assert(TransactionIdIsNormal(classForm->relfrozenxid));
        Assert(MultiXactIdIsValid(classForm->relminmxid));

        /*
         * If things are working properly, no relation should have a
         * relfrozenxid or relminmxid that is "in the future".  However, such
         * cases have been known to arise due to bugs in pg_upgrade.  If we
         * see any entries that are "in the future", chicken out and don't do
         * anything.  This ensures we won't truncate clog before those
         * relations have been scanned and cleaned up.
         */
        if (TransactionIdPrecedes(lastSaneFrozenXid, classForm->relfrozenxid) ||
            MultiXactIdPrecedes(lastSaneMinMulti, classForm->relminmxid))
        {
            bogus = true;
            break;
        }

        if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
            newFrozenXid = classForm->relfrozenxid;

        if (MultiXactIdPrecedes(classForm->relminmxid, newMinMulti))
            newMinMulti = classForm->relminmxid;
    }

    /* we're done with pg_class */
    systable_endscan(scan);
    heap_close(relation, AccessShareLock);

    /* chicken out if bogus data found */
    if (bogus)
        return;

    Assert(TransactionIdIsNormal(newFrozenXid));
    Assert(MultiXactIdIsValid(newMinMulti));

    /* Now fetch the pg_database tuple we need to update. */
    relation = heap_open(DatabaseRelationId, RowExclusiveLock);

    /* Fetch a copy of the tuple to scribble on */
    tuple = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
    if (!HeapTupleIsValid(tuple))
        elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
    dbform = (Form_pg_database) GETSTRUCT(tuple);

    /*
     * As in vac_update_relstats(), we ordinarily don't want to let
     * datfrozenxid go backward; but if it's "in the future" then it must be
     * corrupt and it seems best to overwrite it.
     */
    if (dbform->datfrozenxid != newFrozenXid &&
        (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid) ||
         TransactionIdPrecedes(lastSaneFrozenXid, dbform->datfrozenxid)))

    {
        dbform->datfrozenxid = newFrozenXid;
        dirty = true;
    }
    else
        newFrozenXid = dbform->datfrozenxid;

    /* Ditto for datminmxid */
    if (dbform->datminmxid != newMinMulti &&
        (MultiXactIdPrecedes(dbform->datminmxid, newMinMulti) ||
         MultiXactIdPrecedes(lastSaneMinMulti, dbform->datminmxid)))
    {
        dbform->datminmxid = newMinMulti;
        dirty = true;
    }
    else
        newMinMulti = dbform->datminmxid;

    if (dirty)
        heap_inplace_update(relation, tuple);

    heap_freetuple(tuple);
    heap_close(relation, RowExclusiveLock);

    /*
     * If we were able to advance datfrozenxid or datminmxid, see if we can
     * truncate pg_xact and/or pg_multixact.  Also do it if the shared
     * XID-wrap-limit info is stale, since this action will update that too.
     */
    if (dirty || ForceTransactionIdLimitUpdate())
        vac_truncate_clog(newFrozenXid, newMinMulti,
                          lastSaneFrozenXid, lastSaneMinMulti);
}


/*
 *    vac_truncate_clog() -- attempt to truncate the commit log
 *
 *        Scan pg_database to determine the system-wide oldest datfrozenxid,
 *        and use it to truncate the transaction commit log (pg_xact).
 *        Also update the XID wrap limit info maintained by varsup.c.
 *        Likewise for datminmxid.
 *
 *        The passed frozenXID and minMulti are the updated values for my own
 *        pg_database entry. They're used to initialize the "min" calculations.
 *        The caller also passes the "last sane" XID and MXID, since it has
 *        those at hand already.
 *
 *        This routine is only invoked when we've managed to change our
 *        DB's datfrozenxid/datminmxid values, or we found that the shared
 *        XID-wrap-limit info is stale.
 */
static void
vac_truncate_clog(TransactionId frozenXID,
                  MultiXactId minMulti,
                  TransactionId lastSaneFrozenXid,
                  MultiXactId lastSaneMinMulti)
{// #lizard forgives
    TransactionId nextXID = ReadNewTransactionId();
    Relation    relation;
    HeapScanDesc scan;
    HeapTuple    tuple;
    Oid            oldestxid_datoid;
    Oid            minmulti_datoid;
    bool        bogus = false;
    bool        frozenAlreadyWrapped = false;

    /* init oldest datoids to sync with my frozenXID/minMulti values */
    oldestxid_datoid = MyDatabaseId;
    minmulti_datoid = MyDatabaseId;

    /*
     * Scan pg_database to compute the minimum datfrozenxid/datminmxid
     *
     * Since vac_update_datfrozenxid updates datfrozenxid/datminmxid in-place,
     * the values could change while we look at them.  Fetch each one just
     * once to ensure sane behavior of the comparison logic.  (Here, as in
     * many other places, we assume that fetching or updating an XID in shared
     * storage is atomic.)
     *
     * Note: we need not worry about a race condition with new entries being
     * inserted by CREATE DATABASE.  Any such entry will have a copy of some
     * existing DB's datfrozenxid, and that source DB cannot be ours because
     * of the interlock against copying a DB containing an active backend.
     * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
     * concurrently modify the datfrozenxid's of different databases, the
     * worst possible outcome is that pg_xact is not truncated as aggressively
     * as it could be.
     */
    relation = heap_open(DatabaseRelationId, AccessShareLock);

    scan = heap_beginscan_catalog(relation, 0, NULL);

    while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    {
        volatile FormData_pg_database *dbform = (Form_pg_database) GETSTRUCT(tuple);
        TransactionId datfrozenxid = dbform->datfrozenxid;
        TransactionId datminmxid = dbform->datminmxid;

        Assert(TransactionIdIsNormal(datfrozenxid));
        Assert(MultiXactIdIsValid(datminmxid));

        /*
         * If things are working properly, no database should have a
         * datfrozenxid or datminmxid that is "in the future".  However, such
         * cases have been known to arise due to bugs in pg_upgrade.  If we
         * see any entries that are "in the future", chicken out and don't do
         * anything.  This ensures we won't truncate clog before those
         * databases have been scanned and cleaned up.  (We will issue the
         * "already wrapped" warning if appropriate, though.)
         */
        if (TransactionIdPrecedes(lastSaneFrozenXid, datfrozenxid) ||
            MultiXactIdPrecedes(lastSaneMinMulti, datminmxid))
            bogus = true;

        if (TransactionIdPrecedes(nextXID, datfrozenxid))
            frozenAlreadyWrapped = true;
        else if (TransactionIdPrecedes(datfrozenxid, frozenXID))
        {
            frozenXID = datfrozenxid;
            oldestxid_datoid = HeapTupleGetOid(tuple);
        }

        if (MultiXactIdPrecedes(datminmxid, minMulti))
        {
            minMulti = datminmxid;
            minmulti_datoid = HeapTupleGetOid(tuple);
        }
    }

    heap_endscan(scan);

    heap_close(relation, AccessShareLock);

    /*
     * Do not truncate CLOG if we seem to have suffered wraparound already;
     * the computed minimum XID might be bogus.  This case should now be
     * impossible due to the defenses in GetNewTransactionId, but we keep the
     * test anyway.
     */
    if (frozenAlreadyWrapped)
    {
        ereport(WARNING,
                (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
                 errdetail("You might have already suffered transaction-wraparound data loss.")));

        /* Don't return if PGXC: We want to update the transaction limits anyways */
        return;
    }

    /* chicken out if data is bogus in any other way */
    if (bogus)
        return;

    /*
     * Advance the oldest value for commit timestamps before truncating, so
     * that if a user requests a timestamp for a transaction we're truncating
     * away right after this point, they get NULL instead of an ugly "file not
     * found" error from slru.c.  This doesn't matter for xact/multixact
     * because they are not subject to arbitrary lookups from users.
     */
    AdvanceOldestCommitTsXid(frozenXID);

    /*
     * Truncate CLOG, multixact and CommitTs to the oldest computed value.
     */
    TruncateCLOG(frozenXID, oldestxid_datoid);
    TruncateCommitTs(frozenXID);
    TruncateMultiXact(minMulti, minmulti_datoid);

    /*
     * Update the wrap limit for GetNewTransactionId and creation of new
     * MultiXactIds.  Note: these functions will also signal the postmaster
     * for an(other) autovac cycle if needed.   XXX should we avoid possibly
     * signalling twice?
     */
    SetTransactionIdLimit(frozenXID, oldestxid_datoid);
    SetMultiXactIdLimit(minMulti, minmulti_datoid, false);
}


/*
 *    vacuum_rel() -- vacuum one heap relation
 *
 *        Doing one heap at a time incurs extra overhead, since we need to
 *        check that the heap exists again just before we vacuum it.  The
 *        reason that we do this is so that vacuuming can be spread across
 *        many small transactions.  Otherwise, two-phase locking would require
 *        us to lock the entire database during one pass of the vacuum cleaner.
 *
 *        At entry and exit, we are not inside a transaction.
 */
static bool
vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params, StatSyncOpt *syncOpt)
{
    LOCKMODE    lmode;
    Relation    onerel;
    LockRelId    onerelid;
    Oid            toast_relid;
    Oid            save_userid;
    int            save_sec_context;
    int            save_nestlevel;
#ifdef __OPENTENBASE__
       bool            part_vacuum_result = true;
       List            *childs = NULL;
       List            *new_childs = NULL;
       Oid             child;
       ListCell        *lc;
       MemoryContext oldmctx;
#endif


    Assert(params != NULL);

#ifdef __OPENTENBASE__
    if(!IsAutoVacuumWorkerProcess())
    {
        StartTransactionCommand();
        /* functions in indexes may want a snapshot set */
        PushActiveSnapshot(GetLocalTransactionSnapshot());

		   onerel = try_relation_open(relid, AccessShareLock);
        if (!onerel)
        {
            PopActiveSnapshot();
            CommitTransactionCommand();
            return false;
        }

        if (RELATION_IS_INTERVAL(onerel))
        {
            childs	  = RelationGetAllPartitions(onerel);

            oldmctx	  = MemoryContextSwitchTo(vac_context);
            new_childs = list_copy(childs);
            MemoryContextSwitchTo(oldmctx);

            if (childs)
                pfree(childs);
            childs	= NULL;
            onerelid = onerel->rd_lockInfo.lockRelId;
            LockRelationIdForSession(&onerelid, RowExclusiveLock);
        }

		   relation_close(onerel, AccessShareLock);
        onerel = NULL;

        PopActiveSnapshot();
        CommitTransactionCommand();

        if (new_childs)
        {
            foreach (lc, new_childs)
            {
                child			  = lfirst_oid(lc);
				part_vacuum_result = vacuum_rel(child, relation, options, params, syncOpt);
            }
            UnlockRelationIdForSession(&onerelid, RowExclusiveLock);
            pfree(new_childs);

            if (!part_vacuum_result)
            {
                return false;
            }
        }
	}
#endif    
    /* Begin a transaction for vacuuming this relation */
    StartTransactionCommand();


    /* In Postgres-XC, take a snapshot after setting the vacuum flags */
    /*
     * Functions in indexes may want a snapshot set.  Also, setting a snapshot
     * ensures that RecentGlobalXmin is kept truly recent.
     */
    PushActiveSnapshot(GetLocalTransactionSnapshot());

    if (!(options & VACOPT_FULL))
    {
        /*
         * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
         * other concurrent VACUUMs know that they can ignore this one while
         * determining their OldestXmin.  (The reason we don't set it during a
         * full VACUUM is exactly that we may have to run user-defined
         * functions for functional indexes, and we want to make sure that if
         * they use the snapshot set above, any tuples it requires can't get
         * removed from other tables.  An index function that depends on the
         * contents of other tables is arguably broken, but we won't break it
         * here by violating transaction semantics.)
         *
         * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down by
         * autovacuum; it's used to avoid canceling a vacuum that was invoked
         * in an emergency.
         *
         * Note: these flags remain set until CommitTransaction or
         * AbortTransaction.  We don't want to clear them until we reset
         * MyPgXact->xid/xmin, else OldestXmin might appear to go backwards,
         * which is probably Not Good.
         */
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
        MyPgXact->vacuumFlags |= PROC_IN_VACUUM;
        if (params->is_wraparound)
            MyPgXact->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
        LWLockRelease(ProcArrayLock);
    }


    /*
     * Check for user-requested abort.  Note we want this to be inside a
     * transaction, so xact.c doesn't issue useless WARNING.
     */
    CHECK_FOR_INTERRUPTS();

    /*
     * Determine the type of lock we want --- hard exclusive lock for a FULL
     * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
     * way, we can be sure that no other backend is vacuuming the same table.
     */
    lmode = (options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;

    /*
     * Open the relation and get the appropriate lock on it.
     *
     * There's a race condition here: the rel may have gone away since the
     * last time we saw it.  If so, we don't need to vacuum it.
     *
     * If we've been asked not to wait for the relation lock, acquire it first
     * in non-blocking mode, before calling try_relation_open().
     */
    if (!(options & VACOPT_NOWAIT))
        onerel = try_relation_open(relid, lmode);
    else if (ConditionalLockRelationOid(relid, lmode))
        onerel = try_relation_open(relid, NoLock);
    else
    {
        onerel = NULL;
        if (IsAutoVacuumWorkerProcess() && params->log_min_duration >= 0)
            ereport(LOG,
                    (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
                     errmsg("skipping vacuum of \"%s\" --- lock not available",
                            relation->relname)));
    }

    if (!onerel)
    {
        PopActiveSnapshot();
        CommitTransactionCommand();
        return false;
    }

    /*
     * Check permissions.
     *
     * We allow the user to vacuum a table if he is superuser, the table
     * owner, or the database owner (but in the latter case, only if it's not
     * a shared relation).  pg_class_ownercheck includes the superuser case.
     *
     * Note we choose to treat permissions failure as a WARNING and keep
     * trying to vacuum the rest of the DB --- is this appropriate?
     */
    if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
          (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
    {
        if (onerel->rd_rel->relisshared)
            ereport(WARNING,
                    (errmsg("skipping \"%s\" --- only superuser can vacuum it",
                            RelationGetRelationName(onerel))));
#ifdef _PG_ORCL_
        else if (IsSystemNamespace(onerel->rd_rel->relnamespace))
#else
        else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
#endif
			ereport(WARNING,
					(errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
							RelationGetRelationName(onerel))));
		else
			ereport(WARNING,
					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
							RelationGetRelationName(onerel))));
		relation_close(onerel, lmode);
		PopActiveSnapshot();
		CommitTransactionCommand();
		return false;
	}

	/*
	 * Check that it's a vacuumable relation; we used to do this in
	 * get_rel_oids() but seems safer to check after we've locked the
	 * relation.
	 */
	if (onerel->rd_rel->relkind != RELKIND_RELATION &&
		onerel->rd_rel->relkind != RELKIND_MATVIEW &&
		onerel->rd_rel->relkind != RELKIND_TOASTVALUE &&
		onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
	{
		ereport(WARNING,
				(errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
						RelationGetRelationName(onerel))));
		relation_close(onerel, lmode);
		PopActiveSnapshot();
		CommitTransactionCommand();
		return false;
	}

	/*
	 * Silently ignore tables that are temp tables of other backends ---
	 * trying to vacuum these will lead to great unhappiness, since their
	 * contents are probably not up-to-date on disk.  (We don't throw a
	 * warning here; it would just lead to chatter during a database-wide
	 * VACUUM.)
	 */
	if (RELATION_IS_OTHER_TEMP(onerel))
	{
		relation_close(onerel, lmode);
		PopActiveSnapshot();
		CommitTransactionCommand();
		return false;
	}

	/*
	 * Get a session-level lock too. This will protect our access to the
	 * relation across multiple transactions, so that we can vacuum the
	 * relation's TOAST table (if any) secure in the knowledge that no one is
	 * deleting the parent relation.
	 *
	 * NOTE: this cannot block, even if someone else is waiting for access,
	 * because the lock manager knows that both lock requests are from the
	 * same process.
	 */
	onerelid = onerel->rd_lockInfo.lockRelId;
	LockRelationIdForSession(&onerelid, lmode);

	/*
	 * Remember the relation's TOAST relation for later, if the caller asked
	 * us to process it.  In VACUUM FULL, though, the toast table is
	 * automatically rebuilt by cluster_rel so we shouldn't recurse to it.
	 */
	if (!(options & VACOPT_SKIPTOAST) && !(options & VACOPT_FULL))
		toast_relid = onerel->rd_rel->reltoastrelid;
	else
		toast_relid = InvalidOid;

	/*
	 * Switch to the table owner's userid, so that any index functions are run
	 * as that user.  Also lock down security-restricted operations and
	 * arrange to make GUC variable changes local to this command. (This is
	 * unnecessary, but harmless, for lazy VACUUM.)
	 */
	GetUserIdAndSecContext(&save_userid, &save_sec_context);
	SetUserIdAndSecContext(onerel->rd_rel->relowner,
						   save_sec_context | SECURITY_RESTRICTED_OPERATION);
	save_nestlevel = NewGUCNestLevel();

	/*
	 * Ignore partitioned tables as there is no work to be done.  Since we
	 * release the lock here, it's possible that any partitions added from
	 * this point on will not get processed, but that seems harmless.
	 */
	if (onerel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
	{
		/* Roll back any GUC changes executed by index functions */
		AtEOXact_GUC(false, save_nestlevel);

		/* Restore userid and security context */
		SetUserIdAndSecContext(save_userid, save_sec_context);

		relation_close(onerel, NoLock);
		PopActiveSnapshot();
		CommitTransactionCommand();

		/*
		 * If the relation has a secondary toast rel, vacuum that too while we
		 * still hold the session lock on the master table.  Note however that
		 * "analyze" will not get done on the toast table.  This is good, because
		 * the toaster always uses hardcoded index access and statistics are
		 * totally unimportant for toast relations.
		 */
		if (toast_relid != InvalidOid)
		{
			vacuum_rel(toast_relid, relation, options, params, syncOpt);
		}

		/*
		 * Now release the session-level lock on the master table.
		 */
		UnlockRelationIdForSession(&onerelid, lmode);

		/* It's OK for other commands to look at this table */
		return true;
	}


#ifdef XCP
    /*
     * If we are on coordinator and target relation is distributed, read
     * the statistics from the data node instead of vacuuming local relation.
     */
    if (IS_PGXC_COORDINATOR && onerel->rd_locator_info)
    {
		vacuum_rel_coordinator(onerel, true, params, syncOpt);
    }
    else
#endif
    /*
     * Do the actual work --- either FULL or "lazy" vacuum
     */
    if (options & VACOPT_FULL)
    {
        /* close relation before vacuuming, but hold lock until commit */
        relation_close(onerel, NoLock);
        onerel = NULL;

        /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
        cluster_rel(relid, InvalidOid, false,
                    (options & VACOPT_VERBOSE) != 0);
    }
    else
        lazy_vacuum_rel(onerel, options, params, vac_strategy);

    /* Roll back any GUC changes executed by index functions */
    AtEOXact_GUC(false, save_nestlevel);

    /* Restore userid and security context */
    SetUserIdAndSecContext(save_userid, save_sec_context);

    /* all done with this class, but hold lock until commit */
    if (onerel)
        relation_close(onerel, NoLock);

    /*
     * Complete the transaction and free all temporary memory used.
     */
    PopActiveSnapshot();
    CommitTransactionCommand();

    /*
     * If the relation has a secondary toast rel, vacuum that too while we
     * still hold the session lock on the master table.  Note however that
     * "analyze" will not get done on the toast table.  This is good, because
     * the toaster always uses hardcoded index access and statistics are
     * totally unimportant for toast relations.
     */
    if (toast_relid != InvalidOid)
		vacuum_rel(toast_relid, relation, options, params, syncOpt);

    /*
     * Now release the session-level lock on the master table.
     */
    UnlockRelationIdForSession(&onerelid, lmode);

    /* Report that we really did it. */
    return true;
}


/*
 * Open all the vacuumable indexes of the given relation, obtaining the
 * specified kind of lock on each.  Return an array of Relation pointers for
 * the indexes into *Irel, and the number of indexes into *nindexes.
 *
 * We consider an index vacuumable if it is marked insertable (IndexIsReady).
 * If it isn't, probably a CREATE INDEX CONCURRENTLY command failed early in
 * execution, and what we have is too corrupt to be processable.  We will
 * vacuum even if the index isn't indisvalid; this is important because in a
 * unique index, uniqueness checks will be performed anyway and had better not
 * hit dangling index pointers.
 */
void
vac_open_indexes(Relation relation, LOCKMODE lockmode,
                 int *nindexes, Relation **Irel)
{
    List       *indexoidlist;
    ListCell   *indexoidscan;
    int            i;

    Assert(lockmode != NoLock);

    indexoidlist = RelationGetIndexList(relation);

    /* allocate enough memory for all indexes */
    i = list_length(indexoidlist);

    if (i > 0)
        *Irel = (Relation *) palloc(i * sizeof(Relation));
    else
        *Irel = NULL;

    /* collect just the ready indexes */
    i = 0;
    foreach(indexoidscan, indexoidlist)
    {
        Oid            indexoid = lfirst_oid(indexoidscan);
        Relation    indrel;

        indrel = index_open(indexoid, lockmode);
        if (IndexIsReady(indrel->rd_index))
            (*Irel)[i++] = indrel;
        else
            index_close(indrel, lockmode);
    }

    *nindexes = i;

    list_free(indexoidlist);
}

/*
 * Release the resources acquired by vac_open_indexes.  Optionally release
 * the locks (say NoLock to keep 'em).
 */
void
vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
{
    if (Irel == NULL)
        return;

    while (nindexes--)
    {
        Relation    ind = Irel[nindexes];

        index_close(ind, lockmode);
    }
    pfree(Irel);
}

/*
 * vacuum_delay_point --- check for interrupts and cost-based delay.
 *
 * This should be called in each major loop of VACUUM processing,
 * typically once per page processed.
 */
void
vacuum_delay_point(void)
{
    /* Always check for interrupts */
    CHECK_FOR_INTERRUPTS();

    /* Nap if appropriate */
    if (VacuumCostActive && !InterruptPending &&
        VacuumCostBalance >= VacuumCostLimit)
    {
        int            msec;

        msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
        if (msec > VacuumCostDelay * 4)
            msec = VacuumCostDelay * 4;

        pg_usleep(msec * 1000L);

        VacuumCostBalance = 0;

        /* update balance values for workers */
        AutoVacuumUpdateDelay();

        /* Might have gotten an interrupt while sleeping */
        CHECK_FOR_INTERRUPTS();
    }
}

#ifdef XCP
/*
 * For the data node query make up TargetEntry representing specified column
 * of pg_class catalog table
 */
TargetEntry *
make_relation_tle(Oid reloid, const char *relname, const char *column)
{
    HeapTuple    tuple;
    Var           *var;
    Form_pg_attribute att_tup;
    TargetEntry *tle;

    tuple = SearchSysCacheAttName(reloid, column);
    if (!HeapTupleIsValid(tuple))
        ereport(ERROR,
                (errcode(ERRCODE_UNDEFINED_COLUMN),
                 errmsg("column \"%s\" of relation \"%s\" does not exist",
                        column, relname)));
    att_tup = (Form_pg_attribute) GETSTRUCT(tuple);

    var = makeVar(1,
                  att_tup->attnum,
                  att_tup->atttypid,
                  att_tup->atttypmod,
                  InvalidOid,
                  0);

    tle = makeTargetEntry((Expr *) var, att_tup->attnum, NULL, false);
    ReleaseSysCache(tuple);
    return tle;
}


/*
 * Get relation statistics from remote data nodes
 * Returns number of nodes that returned correct statistics.
 */
static int
get_remote_relstat(char *nspname, char *relname, bool replicated,
                   int32 *pages, int32 *allvisiblepages,
                   float4 *tuples, TransactionId *frozenXid)
{// #lizard forgives
    StringInfoData query;
    EState        *estate;
    MemoryContext oldcontext;
    RemoteQuery *step;
    RemoteQueryState *node;
    TupleTableSlot *result;
    int            validpages,
                validtuples,
                validfrozenxids,
                validallvisiblepages;

    /* Make up query string */
    initStringInfo(&query);
    appendStringInfo(&query, "SELECT c.relpages, "
                                    "c.reltuples, "
                                    "c.relallvisible, "
                                    "c.relfrozenxid "
                             "FROM pg_class c JOIN pg_namespace n "
                             "ON c.relnamespace = n.oid "
                             "WHERE n.nspname = '%s' "
                             "AND c.relname = '%s'",
                     nspname, relname);

    /* Build up RemoteQuery */
    step = makeNode(RemoteQuery);

    step->combine_type = COMBINE_TYPE_NONE;
    step->exec_nodes = NULL;
    step->sql_statement = query.data;
    step->force_autocommit = true;
    step->exec_type = EXEC_ON_DATANODES;

    /* Add targetlist entries */
    step->scan.plan.targetlist = lappend(step->scan.plan.targetlist,
                                         make_relation_tle(RelationRelationId,
                                                           "pg_class",
                                                           "relpages"));
    step->scan.plan.targetlist = lappend(step->scan.plan.targetlist,
                                         make_relation_tle(RelationRelationId,
                                                           "pg_class",
                                                           "reltuples"));
    step->scan.plan.targetlist = lappend(step->scan.plan.targetlist,
                                         make_relation_tle(RelationRelationId,
                                                           "pg_class",
                                                           "relallvisible"));
    step->scan.plan.targetlist = lappend(step->scan.plan.targetlist,
                                         make_relation_tle(RelationRelationId,
                                                           "pg_class",
                                                           "relfrozenxid"));

    /* Execute query on the data nodes */
    estate = CreateExecutorState();

    oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

    estate->es_snapshot = GetActiveSnapshot();

    node = ExecInitRemoteQuery(step, estate, 0);
    MemoryContextSwitchTo(oldcontext);
    /* get ready to combine results */
    *pages = 0;
    *allvisiblepages = 0;
    *tuples = 0.0;
    *frozenXid = InvalidTransactionId;
    validpages = 0;
    validallvisiblepages = 0;
    validtuples = 0;
    validfrozenxids = 0;
    result = ExecRemoteQuery((PlanState *) node);
    while (result != NULL && !TupIsNull(result))
    {
        Datum     value;
        bool    isnull;
        /* Process statistics from the data node */
        value = slot_getattr(result, 1, &isnull); /* relpages */
        if (!isnull)
        {
            validpages++;
            *pages += DatumGetInt32(value);
        }
        value = slot_getattr(result, 2, &isnull); /* reltuples */
        if (!isnull)
        {
            validtuples++;
            *tuples += DatumGetFloat4(value);
        }
        value = slot_getattr(result, 3, &isnull); /* relallvisible */
        if (!isnull)
        {
            validallvisiblepages++;
            *allvisiblepages += DatumGetInt32(value);
        }
        value = slot_getattr(result, 4, &isnull); /* relfrozenxid */
        if (!isnull)
        {
            /*
             * relfrozenxid on coordinator should be the lowest one from the
             * datanodes.
             */
            TransactionId xid = DatumGetTransactionId(value);
            if (TransactionIdIsValid(xid))
            {
                validfrozenxids++;
                if (!TransactionIdIsValid(*frozenXid) ||
                        TransactionIdPrecedes(xid, *frozenXid))
                {
                    *frozenXid = xid;
                }
            }
        }
        /* fetch next */
        result = ExecRemoteQuery((PlanState *) node);
    }
    ExecEndRemoteQuery(node);

    if (replicated)
    {
        /*
         * Normally numbers should be the same on the nodes, but relations
         * are autovacuum'ed independedly, so they may differ.
         * Average is good enough approximation in this case.
         */
        if (validpages > 0)
            *pages /= validpages;

        if (validtuples > 0)
            *tuples /= validtuples;

        if (validallvisiblepages > 0)
            *allvisiblepages /= validallvisiblepages;
    }

    if (validfrozenxids < validpages || validfrozenxids < validtuples)
    {
        /*
         * If some node returned invalid value for frozenxid we can not set
         * it on coordinator. There are other cases when returned value of
         * frozenXid should be ignored, these cases are checked by caller.
         * Basically, to be sure, there should be one value from each node,
         * where the table is partitioned.
         */
        *frozenXid = InvalidTransactionId;
        return Max(validpages, validtuples);
    }
    else
    {
        return validfrozenxids;
    }
}


/*
 * Get relation statistics from coordinator node specified by syncOpt
 */
static void
sync_remote_relstat(char *nspname, char *relname, bool replicated,
				   int32 *pages, int32 *allvisiblepages,
					float4 *tuples, TransactionId *frozenXid, StatSyncOpt *syncOpt)
{
	char *cnname;
	StringInfoData query;
	EState 	   *estate;
	MemoryContext oldcontext;
	RemoteQuery *step;
	RemoteQueryState *node;
	TupleTableSlot *result;

	/* Make up query string */
	initStringInfo(&query);
	appendStringInfo(&query, "SELECT c.relpages, "
									"c.reltuples, "
									"c.relallvisible, "
									"c.relfrozenxid "
							 "FROM pg_class c JOIN pg_namespace n "
							 "ON c.relnamespace = n.oid "
							 "WHERE n.nspname = '%s' "
							 "AND c.relname = '%s'",
					 nspname, relname);

	/* Build up RemoteQuery */
	step = init_sync_remotequery(syncOpt, &cnname);
	step->sql_statement = query.data;
	step->force_autocommit = true;


	/* Add targetlist entries */
	step->scan.plan.targetlist = lappend(step->scan.plan.targetlist,
										 make_relation_tle(RelationRelationId,
														   "pg_class",
														   "relpages"));
	step->scan.plan.targetlist = lappend(step->scan.plan.targetlist,
										 make_relation_tle(RelationRelationId,
														   "pg_class",
														   "reltuples"));
	step->scan.plan.targetlist = lappend(step->scan.plan.targetlist,
										 make_relation_tle(RelationRelationId,
														   "pg_class",
														   "relallvisible"));
	step->scan.plan.targetlist = lappend(step->scan.plan.targetlist,
										 make_relation_tle(RelationRelationId,
														   "pg_class",
														   "relfrozenxid"));

	/* Execute query on the data nodes */
	estate = CreateExecutorState();
	oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

	node = ExecInitRemoteQuery(step, estate, 0);
	MemoryContextSwitchTo(oldcontext);
	/* get ready to combine results */
	*pages = 0;
	*allvisiblepages = 0;
	*tuples = 0.0;
	*frozenXid = InvalidTransactionId;

	result = ExecRemoteQuery((PlanState *) node);
	if (result != NULL && !TupIsNull(result))
	{
		Datum 	value;
		bool	isnull;
		/* Process statistics from the data node */
		value = slot_getattr(result, 1, &isnull); /* relpages */
		if (!isnull)
		{
			*pages = DatumGetInt32(value);
		}
		value = slot_getattr(result, 2, &isnull); /* reltuples */
		if (!isnull)
		{
			*tuples = DatumGetFloat4(value);
		}
		value = slot_getattr(result, 3, &isnull); /* relallvisible */
		if (!isnull)
		{
			*allvisiblepages = DatumGetInt32(value);
		}
		value = slot_getattr(result, 4, &isnull); /* relfrozenxid */
		if (!isnull)
		{
			TransactionId xid = DatumGetTransactionId(value);
			if (TransactionIdIsValid(xid))
			{
					*frozenXid = xid;
			}
		}
	}
	ExecEndRemoteQuery(node);
	FreeExecutorState(estate);
}

/*
 * Coordinator does not contain any data, so we never need to vacuum relations.
 * This function only updates optimizer statistics based on info from the
 * data nodes.
 */
void
vacuum_rel_coordinator(Relation onerel, bool is_outer, VacuumParams *params, StatSyncOpt *syncOpt)
{
    char        *nspname;
    char        *relname;
    /* fields to combine relation statistics */
    int32        num_pages;
    int32        num_allvisible_pages;
    float4        num_tuples;
    TransactionId min_frozenxid;
    bool        hasindex;
    bool         replicated;
	int 		rel_nodes = 0;
	bool        isSync = false;
#ifdef __OPENTENBASE__
	TransactionId oldestXmin = InvalidTransactionId;
	TransactionId freezeLimit = InvalidTransactionId;
	MultiXactId multiXactCutoff = InvalidMultiXactId;
#endif

    /* Get the relation identifier */
    relname = RelationGetRelationName(onerel);
    nspname = get_namespace_name(RelationGetNamespace(onerel));

	elog(DEBUG5, "Getting relation statistics for %s.%s", nspname, relname);

#ifdef __OPENTENBASE__
	if (params && onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
	{
		vacuum_set_xid_limits(onerel,
							  params->freeze_min_age,
							  params->freeze_table_age,
							  params->multixact_freeze_min_age,
							  params->multixact_freeze_table_age,
							  &oldestXmin, &freezeLimit, NULL,
							  &multiXactCutoff, NULL);
	}
#endif

    replicated = IsLocatorReplicated(RelationGetLocatorType(onerel));
    /*
     * Get stats from the remote nodes. Function returns the number of nodes
     * returning correct stats.
     */
	if (syncOpt != NULL && syncOpt->is_sync_from == true &&
		!RELATION_IS_COORDINATOR_LOCAL(onerel))
	{
		sync_remote_relstat(nspname,
							relname,
							replicated,
							&num_pages,
							&num_allvisible_pages,
							&num_tuples,
							&min_frozenxid,
							syncOpt);
		isSync = true;
	}else
    rel_nodes = get_remote_relstat(nspname, relname, replicated,
                                   &num_pages, &num_allvisible_pages,
                                   &num_tuples, &min_frozenxid);
	if (rel_nodes > 0 || isSync)
    {
        int            nindexes;
        Relation   *Irel;
        int         nodes = list_length(RelationGetLocInfo(onerel)->rl_nodeList);

        vac_open_indexes(onerel, ShareUpdateExclusiveLock, &nindexes, &Irel);
        hasindex = (nindexes > 0);

        if (hasindex)
        {
            int     i;

            /* Fetch index stats */
            for (i = 0; i < nindexes; i++)
            {
                int32    idx_pages, idx_allvisible_pages;
                float4    idx_tuples;
                TransactionId idx_frozenxid;
				int idx_nodes = 0;

                /* Get the index identifier */
                relname = RelationGetRelationName(Irel[i]);
                nspname = get_namespace_name(RelationGetNamespace(Irel[i]));
                /* Index is replicated if parent relation is replicated */
				if(isSync)
				{
					sync_remote_relstat(nspname,
										relname,
										replicated,
										&idx_pages,
										&idx_allvisible_pages,
										&idx_tuples,
										&idx_frozenxid,
										syncOpt);
				}else
                idx_nodes = get_remote_relstat(nspname, relname, replicated,
                                        &idx_pages, &idx_allvisible_pages,
                                        &idx_tuples, &idx_frozenxid);
				if (idx_nodes > 0 || isSync)
                {
                    /*
                     * Do not update the frozenxid if information was not from
                     * all the expected nodes.
                     */
					if (idx_nodes < nodes && !isSync)
                    {
                        idx_frozenxid = InvalidTransactionId;
                    }
                    /* save changes */
                    /* !!TODO Get multi-xid from remote nodes */
                    vac_update_relstats(Irel[i],
                                        (BlockNumber) idx_pages,
                                        (double) idx_tuples,
                                        0,
                                        false,
                                        idx_frozenxid,
                                        InvalidMultiXactId,
                                        is_outer);
                }
            }
        }

        /* Done with indexes */
        vac_close_indexes(nindexes, Irel, NoLock);

        /*
         * Do not update the frozenxid if information was not from all
         * the expected nodes.
         */
		if (rel_nodes < nodes && !isSync)
        {
            min_frozenxid = InvalidTransactionId;
        }

#ifdef __OPENTENBASE__
		if (TransactionIdIsValid(freezeLimit))
		{
	        min_frozenxid = freezeLimit;
		}
#endif

        /* save changes */
        vac_update_relstats(onerel,
                            (BlockNumber) num_pages,
                            (double) num_tuples,
                            num_allvisible_pages,
                            hasindex,
                            min_frozenxid,
                            InvalidMultiXactId,
                            is_outer);
    }
}
#endif

#ifdef _SHARDING_
void
ExecVacuumShard(VacuumShardStmt *stmt)
{
    List *relations = NULL;
    ListCell *cur = NULL;
    int tuples = 0;
    
    vac_context = AllocSetContextCreate(PortalContext,
                                        "Vacuum",
                                        ALLOCSET_DEFAULT_SIZES);
    relations = get_rel_oids(InvalidOid, stmt->relation);

    if(ActiveSnapshotSet())
        PopActiveSnapshot();
    CommitTransactionCommand();

    foreach(cur, relations)
    {        
        Oid         relid = lfirst_oid(cur);
        //Relation    rel = heap_open(relid, ShareLock);
        ListCell     *lc_shard;

        /* for every shard */
        foreach(lc_shard, stmt->shards)
        {
            int s_tuples = 0;

            A_Const * con = ((A_Const *)lfirst(lc_shard));
            
            ShardID sid = intVal(&(con->val));
            
            s_tuples = TruncateShard(relid, sid, stmt->pause);

            elog(INFO, "Vacuum One Shard Success. rel=%d, sid=%d, tuples=%d",
                        relid, sid, s_tuples);
            tuples += s_tuples;
        }

        //heap_close(rel, ShareLock);

        elog(INFO, "Vacuum Shard Success. rel=%d, tuples=%d",
                        relid, tuples);
    }

    StartTransactionCommand();
    MemoryContextDelete(vac_context);
    vac_context = NULL;
}
#endif
